package com.softwin.cryptocurrency.demo.marketMaking;

import com.softwin.cryptocurrency.demo.security.JwtCallCredential;
import com.softwin.grpc.common.Common;
import com.softwin.marketdata.gateway.service.MarketDataManageGrpc;
import com.softwin.marketdata.gateway.service.MarketGateway;
import com.softwin.order.reporting.service.*;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.Map;

/**
 * Created by nirui.
 */
public class MarketMaker {
    private String host = "140.207.81.249"; //服务器地址
    private OrderReportingGrpc.OrderReportingStub tgStub; //交易网关
    private MarketDataManageGrpc.MarketDataManageStub mgStub; //行情网关
    private Common.Symbol btcusdtSymbol; //交易币对
    private String subaccountCode = "huobi_a_001"; //子账号
    private String username = "openTrader"; //交易员用户名
    private String password = "66666"; //交易员密码
    private String quantity = "0.001"; //下单量
    //目前挂的买单
    private Order buyOrder = null;
    //目前挂的卖单
    private Order sellOrder = null;

    //挂单价格偏移盘口量
    private double priceInterval = 0.01;
    //挂单价格允许误差，在范围内可以不重新挂单，超过就重新挂
    private double priceOffset = 0.003;
    //需要进行撤单的订单,维护在map中,防止太多单子没有撤销
    private Map<String, Order> canceledOrderMap = new HashMap<>();

    public static void main(String[] args)  throws InterruptedException {
        MarketMaker cma = new MarketMaker();
        cma.subMarketData();
        Thread.sleep(200000);
    }

    /**
     * 简单的做市策略，在盘口上下1%进行挂单
     * 在挂单和已有订单价格相差超过0.3%时，调整订单价格。
     */
    private MarketMaker(){
        ManagedChannel mgChannel = ManagedChannelBuilder.forAddress(host, 8991).usePlaintext().build();
        //初始化行情网关
        mgStub = MarketDataManageGrpc.newStub(mgChannel);
        ManagedChannel tgChannel = ManagedChannelBuilder.forAddress(host, 8993).usePlaintext().build();
        //登录获取token
        LoginGrpc.LoginBlockingStub loginBlockingStub = LoginGrpc.newBlockingStub(tgChannel);
        LoginOuterClass.RspLogin rspLogin = loginBlockingStub.login(LoginOuterClass.ReqLogin.newBuilder().setUsername(username).setPassword(password).build());
        String token = rspLogin.getToken();
        //初始化交易网关和查询网关
        tgStub = OrderReportingGrpc.newStub(tgChannel).withCallCredentials(new JwtCallCredential(token));

        btcusdtSymbol = Common.Symbol.newBuilder().setQuoteCurrency("usdt").setBaseCurrency("btc").setExchangeType("huobi").build();
        init();
    }

    /**
     * 订阅成交回报，在回报时维护buyOrder,sellOrder,canceledOrderMap
     */
    private void init(){
        //订阅成交回报
        StreamObserver<Orderreporting.ReqSubTrade> reqSubTradeStreamObserver = tgStub.subTrade(new StreamObserver<Orderreporting.RtnSubTrade>() {
            @Override
            public void onNext(Orderreporting.RtnSubTrade value) {
                System.out.println(value);
                //更新btc余额，这里只考虑全部成交，并且忽略了手续费
                if (new BigDecimal(value.getTrade().getQuantity()).compareTo(new BigDecimal(quantity)) == 0) {
                    if (value.getTrade().getSide().equals("buy")) {
                        System.out.println("买单已成交");
                        if (buyOrder != null && value.getTrade().getOrderId().equals(buyOrder.orderId)) {
                            buyOrder = null;
                        } else {
                            canceledOrderMap.remove(value.getTrade().getOrderId());
                        }
                    } else if (value.getTrade().getSide().equals("sell")) {
                        System.out.println("卖单已成交");
                        if (sellOrder != null && value.getTrade().getOrderId().equals(sellOrder.orderId)) {
                            sellOrder = null;
                        } else {
                            canceledOrderMap.remove(value.getTrade().getOrderId());
                        }
                    }
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
        //不要漏掉下面这句调用，向服务请求订阅成交回报
        reqSubTradeStreamObserver.onNext(Orderreporting.ReqSubTrade.newBuilder().build());
    }

    /**
     * 调用接口进行限价买入，收到响应后将orderId放入orderMap中
     */
    private void buy(double price){
        final Orderreporting.InsertOrderRequest orderTradeRequest =Orderreporting.InsertOrderRequest.newBuilder().
                setSubaccountCode(subaccountCode).setSymbol(btcusdtSymbol).setQuantity(quantity).setPrice(String.valueOf(price)).setType("limit").setSide("buy").build();
        System.out.println("报单买入： 价格" + price);
        tgStub.insertOrder(orderTradeRequest, new StreamObserver<Orderreporting.InsertOrderResponse>() {
            @Override
            public void onNext(Orderreporting.InsertOrderResponse value) {
                if(!value.getCRsp().getIsSuccess()){
                    System.out.println("报单失败: " + value.getCRsp().getErrMsg());
                    buyOrder = null;
                }else{
                    //收到响应后更新orderId
                    buyOrder = new Order(price, Double.valueOf(quantity), value.getOrderId(), Order.TYPE_BUY);
                    System.out.println("报单成功：" + value.getOrderId());
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("报单失败: " + t.getMessage());
                buyOrder = null;
            }

            @Override
            public void onCompleted() {

            }
        });
    }

    /**
     * 调用接口进行限价卖出，收到响应后将orderId放入orderMap中
     */
    private void sell(double price){
        final Orderreporting.InsertOrderRequest orderTradeRequest =Orderreporting.InsertOrderRequest.newBuilder().
                setSubaccountCode(subaccountCode).setSymbol(btcusdtSymbol).setQuantity(quantity).setPrice(String.valueOf(price)).setType("limit").setSide("sell").build();
        System.out.println("报单卖出： 价格" + price);
        tgStub.insertOrder(orderTradeRequest, new StreamObserver<Orderreporting.InsertOrderResponse>() {
            @Override
            public void onNext(Orderreporting.InsertOrderResponse value) {
                if(!value.getCRsp().getIsSuccess()){
                    System.out.println("报单失败: " + value.getCRsp().getErrMsg());
                    sellOrder = null;
                }else{
                    //收到响应后更新orderId
                    sellOrder = new Order(price, Double.valueOf(quantity), value.getOrderId(), Order.TYPE_SELL);
                    System.out.println("报单成功：" + value.getOrderId());
                }
            }

            @Override
            public void onError(Throwable t) {
                System.out.println("报单失败: " + t.getMessage());
                sellOrder = null;
            }

            @Override
            public void onCompleted() {

            }
        });
    }

    /**
     * 撤单，收到响应后，将订单从orderMap中移除
     * @param orderId 订单ID
     */
    private void cancel(String orderId) {
        //在撤单系统中进行撤单时间记录，对于没有成功撤掉的单子，后续会每隔3秒定时撤单。
        Order order = canceledOrderMap.get(orderId);
        if (order == null) {
            return;
        } else {
            order.lastCancelTime = System.currentTimeMillis();
        }
        final Orderreporting.CancelOrderRequest cancelOrderRequest = Orderreporting.CancelOrderRequest.newBuilder().setOrderId(orderId).build();
        tgStub.cancelOrder(cancelOrderRequest, new StreamObserver<Orderreporting.CancelOrderResponse>() {
            @Override
            public void onNext(Orderreporting.CancelOrderResponse value) {
                if(value.getCRsp().getIsSuccess()){
                    canceledOrderMap.remove(value.getData().getOrderId());
                    System.out.println("取消订单成功 orderId:" + value.getData().getOrderId());
                }else{
                    System.out.println("撤单失败： " + value.getCRsp().getErrMsg());
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
    }

    /**
     * 订阅行情，将行情放入均线计算器，比较是否发生上穿或下穿，并根据当前状态进行报撤单动作
     */
    private void subMarketData(){
        StreamObserver<MarketGateway.ReqSubTick> requestObserver = mgStub.subTick(new StreamObserver<MarketGateway.TickDataResponse>() {
            @Override
            public void onNext(MarketGateway.TickDataResponse rsp) {
                if(!rsp.getCRsp().getIsSuccess()){
                    return ;
                }
                //未撤单列表中单子太多了，需要等一下，如果长期撤不掉，说明程序细节有问题。
                if (canceledOrderMap.size() > 3) {
                    System.out.println("canceledOrderSize is reach max, count:" + canceledOrderMap.size());
                    return;
                }
                //未撤单列表，每隔3秒进行尝试
                for (Order cancelOrder : canceledOrderMap.values()) {
                    if (System.currentTimeMillis() - cancelOrder.lastCancelTime > 3000) {
                        cancel(cancelOrder.orderId);
                    }
                }
                MarketGateway.TickData value = rsp.getData();
                double closePrice = Double.valueOf(value.getClose());
                //检查买单，是否挂在偏离盘口－1%，如果偏移过大，则撤单，重新挂
                if (buyOrder != Order.EMPTY) {
                    if (buyOrder == null || buyOrder.price > closePrice * (1 - priceInterval + priceOffset) || buyOrder.price < closePrice * (1 - priceInterval - priceOffset)) {
                        if (buyOrder != null) {
                            canceledOrderMap.put(buyOrder.orderId, buyOrder);
                            cancel(buyOrder.orderId);
                        }
                        buyOrder = Order.EMPTY;
                        buy(new BigDecimal(String.valueOf(closePrice * (1 - priceInterval))).setScale(2, RoundingMode.HALF_UP).doubleValue());
                    }
                }

                //检查卖单，是否挂在偏离盘口＋1%，如果偏移过大，则撤单，重新挂
                if (sellOrder != Order.EMPTY) {
                    if (sellOrder == null || sellOrder.price > closePrice * (1 + priceInterval + priceOffset) || sellOrder.price < closePrice * (1 + priceInterval - priceOffset)) {
                        if (sellOrder != null) {
                            canceledOrderMap.put(sellOrder.orderId, sellOrder);
                            cancel(sellOrder.orderId);
                        }
                        sellOrder = Order.EMPTY;
                        sell(new BigDecimal(String.valueOf(closePrice * (1 + priceInterval))).setScale(2, RoundingMode.HALF_UP).doubleValue());
                    }
                }
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onCompleted() {

            }
        });
        MarketGateway.ReqSubTick reqSubTick = MarketGateway.ReqSubTick.newBuilder().setSymbol(btcusdtSymbol).build();
        requestObserver.onNext(reqSubTick);
    }

    static class Order {
        public static final int TYPE_BUY = 1;
        public static final int TYPE_SELL = 2;
        public static final Order EMPTY = new Order(0, 0, "", 0);

        public double price;
        public double amount;
        public String orderId;
        public int type;
        public long lastCancelTime;

        public Order(double price, double amount, String orderId, int type) {
            this.price = price;
            this.amount = amount;
            this.orderId = orderId;
            this.type = type;
        }
    }
}

